// bdlcc_boundedqueue.h                                               -*-C++-*-

#ifndef INCLUDED_BDLCC_BOUNDEDQUEUE
#define INCLUDED_BDLCC_BOUNDEDQUEUE

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a thread-aware bounded queue of values.
//
//@CLASSES:
//  bdlcc::BoundedQueue: thread-aware bounded queue of `TYPE`
//
//@SEE_ALSO: bdlcc_fixedqueue
//
//@DESCRIPTION: This component defines a type, `bdlcc::BoundedQueue`, that
// provides an efficient, thread-aware bounded (capacity fixed at construction)
// queue of values.  This class is ideal for synchronization and communication
// between threads in a producer-consumer model when a bounded queue is
// appropriate.   Under most cicrumstances developers should prefer
// this component to the older {bdlcc_fixedqueue} (see {Comparison to
// FixedQueue}).
//
// The queue provides `pushBack` and `popFront` methods for pushing data into
// the queue and popping data from the queue.  When the queue is full, the
// `pushBack` methods block until data is removed from the queue.  When the
// queue is empty, the `popFront` methods block until data appears in the
// queue.  Non-blocking methods `tryPushBack` and `tryPopFront` are also
// provided.  The `tryPushBack` method fails immediately, returning a non-zero
// value, if the queue is full.  The `tryPopFront` method fails immediately,
// returning a non-zero value, if the queue is empty.
//
// The queue may be placed into a "enqueue disabled" state using the
// `disablePushBack` method.  When disabled, `pushBack` and `tryPushBack` fail
// immediately and return an error code.  Any threads blocked in `pushBack`
// when the queue is enqueue disabled return from `pushBack` immediately and
// return an error code.  The queue may be restored to normal operation with
// the `enablePushBack` method.
//
// The queue may be placed into a "dequeue disabled" state using the
// `disablePopFront` method.  When dequeue disabled, `popFront` and
// `tryPopFront` fail immediately and return an error code.  Any threads
// blocked in `popFront` when the queue is dequeue disabled return from
// `popFront` immediately and return an error code.  The queue may be restored
// to normal operation with the `enablePopFront` method.
//
///Comparison To FixedQueue
///------------------------
// Both `bdlcc::FixedQueue` and `bdlcc::BoundedQueue` provide thread-aware
// bounded queues.  Under most circumstances developers should prefer
// {bdlcc_boundedqueue}: it is newer, has additional features, and provides
// better performance under most circumstances.  `bdlcc::BoundedQueue` is not
// quite a drop in replacement for `bdlcc::FixedQueue` so both types are
// currently maintained.  There is additional information about
// performance of various queues in the article Concurrent Queue Evaluation
// (https://tinyurl.com/mr2un9f7).
//
///Template Requirements
///---------------------
// `bdlcc::BoundedQueue` is a template that is parameterized on the type of
// element contained within the queue.  The supplied template argument, `TYPE`,
// must provide both a default constructor and a copy constructor, as well as
// an assignment operator.  If the default constructor accepts a
// `bslma::Allocator *`, `TYPE` must declare the uses `bslma::Allocator` trait
// (see `bslma_usesbslmaallocator`) so that the allocator of the queue is
// propagated to the elements contained in the queue.
//
///Exception Safety
///----------------
// A `bdlcc::BoundedQueue` is exception neutral, and all of the methods of
// `bdlcc::BoundedQueue` provide the basic exception safety guarantee (see
// `bsldoc_glossary`).  If an exception occurs while writing to an element, the
// element is marked unusable until after a read attempt from the element (at
// which point the element is "reclaimed").  This failure to write does not
// increment the result returned by `numElements`.  Hence,
// `numElements() == capacity()` is not a valid replacement for `isFull()`.
//
///Move Semantics in C++03
///-----------------------
// Move-only types are supported by `bdlcc::BoundedQueue` on C++11 platforms
// only (where `BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES` is defined), and are
// not supported on C++03 platforms.  Unfortunately, in C++03, there are user
// types where a `bslmf::MovableRef` will not safely degrade to a lvalue
// reference when a move constructor is not available (types providing a
// constructor template taking any type), so `bslmf::MovableRefUtil::move`
// cannot be used directly on a user supplied template type.  See internal bug
// report 99039150 for more information.
//
///Usage
///-----
// This section illustrates intended use of this component.
//
///Example 1: A Simple Thread Pool
///- - - - - - - - - - - - - - - -
// In the following example a `bdlcc::BoundedQueue` is used to communicate
// between a single "producer" thread and multiple "consumer" threads.  The
// "producer" will push work requests onto the queue, and each "consumer" will
// iteratively take a work request from the queue and service the request.
// This example shows a partial, simplified implementation of the
// `bdlmt::FixedThreadPool` class.  See component `bdlmt_fixedthreadpool` for
// more information.
//
// First, we define a utility classes that handles a simple "work item":
// ```
// /// Work data...
// struct my_WorkData {
// };
//
// struct my_WorkRequest {
//     enum RequestType {
//         e_WORK = 1,
//         e_STOP = 2
//     };
//
//     RequestType d_type;
//     my_WorkData d_data;
//     // Work data...
// };
// ```
// Next, we provide a simple function to service an individual work item.  The
// details are unimportant for this example:
// ```
// /// Do some work based upon the specified `data`.
// void myDoWork(const my_WorkData& data)
// {
//     // do some stuff...
//     (void)data;
// }
// ```
// Then, we define a `myConsumer` function that will pop elements off the queue
// and process them.  Note that the call to `queue->popFront()` will block
// until there is an element available on the queue.  This function will be
// executed in multiple threads, so that each thread waits in
// `queue->popFront()`, and `bdlcc::BoundedQueue` guarantees that each thread
// gets a unique element from the queue:
// ```
// /// Pop elements from the specified `queue`.
// void myConsumer(bdlcc::BoundedQueue<my_WorkRequest> *queue)
// {
//     while (1) {
//         // `popFront()` will wait for a `my_WorkRequest` until available.
//
//         my_WorkRequest item;
//         item.d_type = my_WorkRequest::e_WORK;
//
//         assert(0 == queue->popFront(&item));
//
//         if (item.d_type == my_WorkRequest::e_STOP) { break; }
//         myDoWork(item.d_data);
//     }
// }
// ```
// Finally, we define a `myProducer` function that serves multiple roles: it
// creates the `bdlcc::BoundedQueue`, starts the consumer threads, and then
// produces and enqueues work items.  When work requests are exhausted, this
// function enqueues one `e_STOP` item for each consumer queue.  This `e_STOP`
// item indicates to the consumer thread to terminate its thread-handling
// function.
//
// Note that, although the producer cannot control which thread `pop`s a
// particular work item, it can rely on the knowledge that each consumer thread
// will read a single `e_STOP` item and then terminate.
// ```
// /// Create a queue, start the specified `numThreads` consumer threads,
// /// produce and enqueue work.
// void myProducer(int numThreads)
// {
//     enum {
//         k_MAX_QUEUE_LENGTH = 100,
//         k_NUM_WORK_ITEMS   = 1000
//     };
//
//     bdlcc::BoundedQueue<my_WorkRequest> queue(k_MAX_QUEUE_LENGTH);
//
//     bslmt::ThreadGroup consumerThreads;
//     consumerThreads.addThreads(bdlf::BindUtil::bind(&myConsumer, &queue),
//                                numThreads);
//
//     for (int i = 0; i < k_NUM_WORK_ITEMS; ++i) {
//         my_WorkRequest item;
//         item.d_type = my_WorkRequest::e_WORK;
//         item.d_data = my_WorkData(); // some stuff to do
//         queue.pushBack(item);
//     }
//
//     for (int i = 0; i < numThreads; ++i) {
//         my_WorkRequest item;
//         item.d_type = my_WorkRequest::e_STOP;
//         queue.pushBack(item);
//     }
//
//     consumerThreads.joinAll();
// }
// ```

#include <bdlscm_version.h>

#include <bdlb_bitutil.h>

#include <bslalg_scalarprimitives.h>

#include <bslma_usesbslmaallocator.h>

#include <bslmf_isbitwisecopyable.h>
#include <bslmf_movableref.h>
#include <bslmf_nestedtraitdeclaration.h>

#include <bslmt_condition.h>
#include <bslmt_fastpostsemaphore.h>
#include <bslmt_lockguard.h>
#include <bslmt_mutex.h>

#include <bsls_assert.h>
#include <bsls_atomicoperations.h>
#include <bsls_objectbuffer.h>
#include <bsls_types.h>

#include <bsl_climits.h>
#include <bsl_cstdint.h>

namespace BloombergLP {
namespace bdlcc {

                   // ===================================
                   // class BoundedQueue_PopCompleteGuard
                   // ===================================

/// This class implements a guard that invokes `TYPE::popComplete` on a `NODE`
/// upon destruction.
template <class TYPE, class NODE>
class BoundedQueue_PopCompleteGuard {

    // DATA
    TYPE *d_queue_p;  // managed queue owning the managed node
    NODE *d_node_p;   // managed node

    // NOT IMPLEMENTED
    BoundedQueue_PopCompleteGuard();
    BoundedQueue_PopCompleteGuard(const BoundedQueue_PopCompleteGuard&);
    BoundedQueue_PopCompleteGuard& operator=(
                                         const BoundedQueue_PopCompleteGuard&);

  public:
    // CREATORS

    /// Create a `popComplete` guard managing the specified `queue` and `node`.
    BoundedQueue_PopCompleteGuard(TYPE *queue, NODE *node);

    /// Destroy this object and invoke the `TYPE::popComplete` method with the
    /// managed `node`.
    ~BoundedQueue_PopCompleteGuard();
};

             // ===============================================
             // class BoundedQueue_PushExceptionCompleteProctor
             // ===============================================

/// This class implements a proctor that invokes `TYPE::pushExceptionComplete`
/// upon destruction unless `release` has been called.
template <class TYPE>
class BoundedQueue_PushExceptionCompleteProctor {

    // DATA
    TYPE *d_queue_p;  // managed queue

    // NOT IMPLEMENTED
    BoundedQueue_PushExceptionCompleteProctor();
    BoundedQueue_PushExceptionCompleteProctor(
                             const BoundedQueue_PushExceptionCompleteProctor&);
    BoundedQueue_PushExceptionCompleteProctor& operator=(
                             const BoundedQueue_PushExceptionCompleteProctor&);

  public:
    // CREATORS

    /// Create a `pushExceptionComplete` proctor that conditionally manages the
    /// specified `queue` (if non-zero).
    explicit
    BoundedQueue_PushExceptionCompleteProctor(TYPE *queue);

    /// Destroy this object and, if `release` has not been invoked', invoke the
    /// managed queue's `pushExceptionComplete` method.
    ~BoundedQueue_PushExceptionCompleteProctor();

    // MANIPULATORS

    /// Release from management the queue currently managed by this proctor.
    /// If no queue is currently managed, this method has no effect.
    void release();
};

                         // ========================
                         // struct BoundedQueue_Node
                         // ========================

/// This class implements the queue's node.  A node stores an instance of the
/// specified (template parameter) `TYPE`, and provides an accessor
/// `isUnconstructed` that indicates whether the value of the node was
/// correctly constructed.  If `isUnconstructed` is `false`, then the value
/// (`d_value`) refers to a valid object.  If `isUnconstructed` is `true` then
/// `d_value` does not refer to a valid object, it does not represent a value
/// in this queue, and the destructor of `d_value` should not be called.  The
/// specified (template parameter) type `RECLAIMABLE` is used to provide a
/// compile time optimization for the footprint of this template when the value
/// of `isUnconstructed` is known at compile-time.  If `RECLAIMABLE` is `false`
/// then it can be determined at compile time that the construction of `TYPE`
/// will uncoditionally succeed (e.g., it `IsBitwiseCopyable`), and the
/// `isUnconstructed` property does not require a data member to be accessed at
/// run-time.
template <class TYPE, bool RECLAIMABLE>
struct BoundedQueue_Node;

template <class TYPE>
struct BoundedQueue_Node<TYPE, true> {
  private:
    // DATA
    bool                     d_isUnconstructedFlag;  // node suffered exception

  public:
    // PUBLIC DATA
    bsls::ObjectBuffer<TYPE> d_value;                // stored value

    // MANIPULATORS

    /// If the specified `isUnconstructedFlag` is `false`, then `d_value`
    /// refers to a valid object of (the template parameter) `TYPE`, otherwise
    /// (if `isUnconstrucedFlag` is `true`) `d_value` does not refer to a valid
    /// object (because the attempt to construct `TYPE` resulted in an
    /// expection).  Note that this method is normally invoked after each write
    /// to `d_value`.
    void setIsUnconstructed(bool isUnconstructedFlag);

    // ACCESSORS

    /// Return whether an exception occurred when last writing to `d_value`.
    bool isUnconstructed() const;
};

template <class TYPE>
struct BoundedQueue_Node<TYPE, false> {
    // PUBLIC DATA
    bsls::ObjectBuffer<TYPE> d_value;    // stored value

    // MANIPULATORS

    /// Do nothing.
    void setIsUnconstructed(bool /* value */);

    // ACCESSORS

    /// Return `false`.
    bool isUnconstructed() const;
};

                            // ==================
                            // class BoundedQueue
                            // ==================

/// This class provides a thread-safe bounded queue of values.
template <class TYPE>
class BoundedQueue {

    // PRIVATE CONSTANTS

    // The following constants are used to maintain the queue's `d_popCount`
    // and `d_pushCount` values.  See *Implementation* *Note* for details.

    static const bsls::Types::Uint64 k_STARTED_MASK   = 0x00000000ffffffffLL;
    static const bsls::Types::Uint64 k_STARTED_INC    = 0x0000000000000001LL;
    static const bsls::Types::Uint64 k_STARTED_DEC    = 0xffffffffffffffffLL;
    static const bsls::Types::Uint64 k_FINISHED_INC   = 0x0000000100000000LL;
    static const unsigned int        k_FINISHED_SHIFT = 32;

    static const unsigned int k_MAXIMUM_CIRCULAR_DIFFERENCE =
         static_cast<unsigned int>(1) << (sizeof(unsigned int) * CHAR_BIT - 1);

    // PRIVATE TYPES
    typedef unsigned int                                         Uint;
    typedef typename bsls::Types::Uint64                         Uint64;
    typedef typename bsls::AtomicOperations::AtomicTypes::Uint   AtomicUint;
    typedef typename bsls::AtomicOperations::AtomicTypes::Uint64 AtomicUint64;

    typedef typename bsls::AtomicOperations AtomicOp;

    typedef BoundedQueue_Node<TYPE,
                              !bslmf::IsBitwiseCopyable<TYPE>::value> Node;

    // DATA
    bslmt::FastPostSemaphore  d_pushSemaphore;     // synchronization primitive
                                                   // restricting access to
                                                   // empty elements and
                                                   // providing
                                                   // enablement/disablement of
                                                   // "push" operations

    AtomicUint64              d_pushCount;         // count of "push"
                                                   // operations started and
                                                   // completed, used to detect
                                                   // a quiescent state for
                                                   // safely incrementing
                                                   // number of available
                                                   // elements

    AtomicUint64              d_pushIndex;         // index of next enqueue
                                                   // element location

    bslmt::FastPostSemaphore  d_popSemaphore;      // synchronization primitive
                                                   // restricting access to
                                                   // available elements and
                                                   // providing
                                                   // enablement/disablement of
                                                   // "pop" operations

    AtomicUint64              d_popCount;          // count of "pop" operations
                                                   // started and completed,
                                                   // used to detect a
                                                   // quiescent state for
                                                   // safely incrementing
                                                   // number of empty elements

    AtomicUint64              d_popIndex;          // index of next dequeue
                                                   // element location

    mutable AtomicUint        d_emptyWaiterCount;  // circular count of
                                                   // `waitUntilEmpty`
                                                   // invocations

    AtomicUint                d_emptyCountSeen;    // maximum
                                                   // `d_emptyWaiterCount` seen
                                                   // prior to the queue being
                                                   // observed as empty; used
                                                   // to detect most very short
                                                   // lived transitions to the
                                                   // queue being empty

    mutable bslmt::Mutex      d_emptyMutex;        // blocking point for
                                                   // `waitUntilEmpty`

    mutable bslmt::Condition  d_emptyCondition;    // condition variable for
                                                   // `waitUntilEmpty`

    Node                     *d_element_p;         // array of elements that
                                                   // comprise the bounded
                                                   // queue

    const Uint64              d_capacity;          // capacity of the queue

    bslma::Allocator         *d_allocator_p;       // allocator, held not owned

    // FRIENDS
    friend class BoundedQueue_PopCompleteGuard<
                                            BoundedQueue<TYPE>,
                                            typename BoundedQueue<TYPE>::Node>;

    friend class BoundedQueue_PushExceptionCompleteProctor<
                                                          BoundedQueue<TYPE> >;

    // PRIVATE CLASS METHODS

    /// Return `true` if the specified `lhs` is circularly greater than the
    /// specified `rhs`, and `false` otherwise.  `lhs` is cicularly greater
    /// than `rhs` if `lhs` is equal to a value obtained by adding a value
    /// in `[1 .. 2^31]` to `rhs`.
    static bool circularlyGreater(Uint lhs, Uint rhs);

    /// Return `true` if the specified `count` implies a quiescent state (see
    /// **Implementation Note**), and `false` otherwise.  A quiescent state
    /// indicates there is a (possibly zero length) contiguous set of elements
    /// that can safely be made available to the operation complementary to the
    /// operation `count` tracks (pop and push are complementary operations).
    static bool isQuiescentState(bsls::Types::Uint64 count);

    /// Update the specified `count` to indicate that an operation (or the
    /// optionally specified `num` operations) has completed, and return the
    /// updated `count` value.  Marking an operation finished means that
    /// `isQuiscentState` **may** now be true.  The behavior is undefined
    /// unless `markStartedOperation` was previously called on this `count`,
    /// and a corresponding `markFinishedOperation` or `unmarkStartOperation`
    /// has not already been called.  Note that the "operation" that has
    /// finished refers to either a push or pop (depending on whether this is
    /// applied to `d_pushCount` or `d_popCount`).
    static Uint64 markFinishedOperation(AtomicUint64 *count);
    static Uint64 markFinishedOperation(AtomicUint64 *count, int num);

    /// Update the specified `count` to indicate that a node that suffered an
    /// exception has been reclaimed.  Marking a node reclaimed can *not* alter
    /// the value of `isQuiescentState`, but does increase the size of the
    /// contiguous set of elements that will eventially be made available to
    /// the operation complementary to the operation `count` tracks (pop and
    /// push are complementary operations).  Note that this method does not
    /// require a previous call to `markStartedOperation` and does not meet the
    /// requirements for `markFinishedOperation` or `unmarkStartOperation` to
    /// be invoked.
    static void markReclaimed(AtomicUint64 *count);

    /// Update the specified `count` to indicate that an operation (or the
    /// optionally specified `num` operations) has started, and return the
    /// updated `count` value.  Marking an operation started means that
    /// `isQuiescentState` is not true and will not be true until
    /// `markFinishedOperation` or `unmarkStartedOperation` is invoked.  Note
    /// that the "operation" that has started refers to either a push or pop
    /// (depending on whether this is applied to `d_pushCount` or
    /// `d_popCount`).
    static void markStartedOperation(AtomicUint64 *count);
    static void markStartedOperation(AtomicUint64 *count, int num);

    /// Update the specified `count` to indicate that an operation has aborted
    /// without finishing, and return the updated `count` value.  Marking a
    /// started operation as having aborted means that `isQuiscentState` *may*
    /// now be true.  The behavior is undefined unless `markStartedOperation`
    /// was previously called on this `count`, and a corresponding
    /// `markFinishedOperation` or `unmarkStartOperation` has not already been
    /// called.  Note that the "operation" that has aborted refers to either a
    /// push or pop (depending on whether this is applied to `d_pushCount` or
    /// `d_popCount`).
    static Uint64 unmarkStartedOperation(AtomicUint64 *count);

    // PRIVATE MANIPULATORS

    /// Destruct the value stored in the specified `node`, and mark the `node`
    /// writable.  This method is used within `popFrontHelper` by a guard to
    /// complete the reclamation of a node in the presence of an exception.
    void popComplete(Node *node);

    /// Remove the element from the front of this queue and load that element
    /// into the specified `value`.  This method is invoked by `popFront` and
    /// `tryPopFront` once an element is available.
    void popFrontHelper(TYPE *value);

    /// Mark a "push" operation as complete, and `post` to the `d_popSemaphore`
    /// if appropriate.
    void pushComplete();

    /// Remove the indicator for a started push operation, and `post` to the
    /// `d_popSemaphore` if appropriate.  This method is used within
    /// `pushFront` by a proctor to complete the marking of a node to reclaim
    /// in the presence of an exception.
    void pushExceptionComplete();

    /// If the specified `emptyCount` is (circularly) greater than
    /// `d_emptyCountSeen`, assign `d_emptyCountSeen` the value of `emptyCount`
    /// and return `true`.  Otherwise, return `false`.  A return value of
    /// `true` indicates this thread *must* signal any waiting threads.  Note
    /// that a return value of `false` indicates another thread has (or will)
    /// signal the queue is empty and this thread does not need to signal.
    bool updateEmptyCountSeen(Uint emptyCount);

    // NOT IMPLEMENTED
    BoundedQueue(const BoundedQueue&);
    BoundedQueue& operator=(const BoundedQueue&);

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(BoundedQueue, bslma::UsesBslmaAllocator);

    // PUBLIC TYPES
    typedef TYPE value_type;  // The type for elements.

    // PUBLIC CONSTANTS
    enum {
        e_SUCCESS  =  0,  // must be 0
        e_EMPTY    = -1,
        e_FULL     = -2,
        e_DISABLED = -3,
        e_FAILED   = -4
    };

    // CREATORS

    /// Create a thread-aware queue with at least the specified `capacity`.
    /// Optionally specify a `basicAllocator` used to supply memory.  If
    /// `basicAllocator` is 0, the currently installed default allocator is
    /// used.
    explicit
    BoundedQueue(bsl::size_t capacity, bslma::Allocator *basicAllocator = 0);

    /// Destroy this object.
    ~BoundedQueue();

    // MANIPULATORS

    /// Remove the element from the front of this queue and load that
    /// element into the specified `value`.  If the queue is empty, block
    /// until it is not empty.  Return 0 on success, and a non-zero value
    /// otherwise.  Specifically, return `e_SUCCESS` on success,
    /// `e_DISABLED` if `isPopFrontDisabled()` and `e_FAILED` if an error
    /// occurs.  On failure, `value` is not changed.  Threads blocked due to
    /// the queue being empty will return `e_DISABLED` if `disablePopFront`
    /// is invoked.
    int popFront(TYPE *value);

    /// Append the specified `value` to the back of this queue.  If the
    /// queue is full, block until it is not full.  Return 0 on success, and
    /// a non-zero value otherwise.  Specifically, return `e_SUCCESS` on
    /// success, `e_DISABLED` if `isPushBackDisabled()` and `e_FAILED` if an
    /// error occurs.  Threads blocked due to the queue being full will
    /// return `e_DISABLED` if `disablePushBack` is invoked.
    int pushBack(const TYPE& value);

    /// Append the specified move-insertable `value` to the back of this
    /// queue.  If the queue is full, block until it is not full.  `value`
    /// is left in a valid but unspecified state.  Return 0 on success, and
    /// a non-zero value otherwise.  Specifically, return `e_SUCCESS` on
    /// success, `e_DISABLED` if `isPushBackDisabled()` and `e_FAILED` if an
    /// error occurs.  On failure, `value` is not changed.  Threads blocked
    /// due to the queue being full will return `e_DISABLED` if
    /// `disablePushBack` is invoked.
    int pushBack(bslmf::MovableRef<TYPE> value);

    /// Remove all items currently in this queue.  Note that this operation
    /// is not atomic; if other threads are concurrently pushing items into
    /// the queue the result of `numElements()` after this function returns
    /// is not guaranteed to be 0.
    void removeAll();

    /// Attempt to remove the element from the front of this queue without
    /// blocking, and, if successful, load the specified `value` with the
    /// removed element.  Return 0 on success, and a non-zero value
    /// otherwise.  Specifically, return `e_SUCCESS` on success,
    /// `e_DISABLED` if `isPopFrontDisabled()`, `e_EMPTY` if
    /// `!isPopFrontDisabled()` and the queue was empty, and `e_FAILED` if
    /// an error occurs.  On failure, `value` is not changed.
    int tryPopFront(TYPE *value);

    /// Append the specified `value` to the back of this queue.  Return 0 on
    /// success, and a non-zero value otherwise.  Specifically, return
    /// `e_SUCCESS` on success, `e_DISABLED` if `isPushBackDisabled()`,
    /// `e_FULL` if `!isPushBackDisabled()` and the queue was full, and
    /// `e_FAILED` if an error occurs.
    int tryPushBack(const TYPE& value);

    /// Append the specified move-insertable `value` to the back of this
    /// queue.  `value` is left in a valid but unspecified state.  Return 0
    /// on success, and a non-zero value otherwise.  Specifically, return
    /// `e_SUCCESS` on success, `e_DISABLED` if `isPushBackDisabled()`,
    /// `e_FULL` if `!isPushBackDisabled()` and the queue was full, and
    /// `e_FAILED` if an error occurs.  On failure, `value` is not changed.
    int tryPushBack(bslmf::MovableRef<TYPE> value);

                       // Enqueue/Dequeue State

    /// Disable dequeueing from this queue.  All subsequent invocations of
    /// `popFront` or `tryPopFront` will fail immediately.  All blocked
    /// invocations of `popFront` and `waitUntilEmpty` will fail
    /// immediately.  If the queue is already dequeue disabled, this method
    /// has no effect.
    void disablePopFront();

    /// Disable enqueueing into this queue.  All subsequent invocations of
    /// `pushBack` or `tryPushBack` will fail immediately.  All blocked
    /// invocations of `pushBack` will fail immediately.  If the queue is
    /// already enqueue disabled, this method has no effect.
    void disablePushBack();

    /// Enable dequeueing.  If the queue is not dequeue disabled, this call
    /// has no effect.
    void enablePopFront();

    /// Enable queuing.  If the queue is not enqueue disabled, this call has
    /// no effect.
    void enablePushBack();

    // ACCESSORS

    /// Return the maximum number of elements that may be stored in this queue.
    /// Note that the value returned may be greater than that supplied at
    /// construction.
    bsl::size_t capacity() const;

    /// Return `true` if this queue is empty (has no elements), or `false`
    /// otherwise.
    bool isEmpty() const;

    /// Return `true` if this queue is full (has no available capacity), or
    /// `false` otherwise.  Note that for unbounded queues, this method always
    /// returns `false`.
    bool isFull() const;

    /// Return `true` if this queue is dequeue disabled, and `false` otherwise.
    /// Note that the queue is created in the "dequeue enabled" state.
    bool isPopFrontDisabled() const;

    /// Return `true` if this queue is enqueue disabled, and `false` otherwise.
    /// Note that the queue is created in the "enqueue enabled" state.
    bool isPushBackDisabled() const;

    /// Returns the number of elements currently in this queue.  Note that
    /// `numElements() == capacity()` is not a valid replacement for `isFull`
    /// (see {Exception Safety} for details).
    bsl::size_t numElements() const;

    /// Block until all the elements in this queue are removed.  Return 0 on
    /// success, and a non-zero value otherwise.  Specifically, return
    /// `e_SUCCESS` on success, `e_DISABLED` if `!isEmpty() &&
    /// isPopFrontDisabled()`.  A blocked thread waiting for the queue to empty
    /// will return `e_DISABLED` if `disablePopFront` is invoked.
    int waitUntilEmpty() const;

                                  // Aspects

    /// Return the allocator used by this object to supply memory.
    bslma::Allocator *allocator() const;
};

// ============================================================================
//                             INLINE DEFINITIONS
// ============================================================================

                   // -----------------------------------
                   // class BoundedQueue_PopCompleteGuard
                   // -----------------------------------

// CREATORS
template <class TYPE, class NODE>
inline
BoundedQueue_PopCompleteGuard<TYPE, NODE>::
                         BoundedQueue_PopCompleteGuard(TYPE *queue, NODE *node)
: d_queue_p(queue)
, d_node_p(node)
{
}

template <class TYPE, class NODE>
inline
BoundedQueue_PopCompleteGuard<TYPE, NODE>::~BoundedQueue_PopCompleteGuard()
{
    d_queue_p->popComplete(d_node_p);
}

             // -----------------------------------------------
             // class BoundedQueue_PushExceptionCompleteProctor
             // -----------------------------------------------

// CREATORS
template <class TYPE>
inline
BoundedQueue_PushExceptionCompleteProctor<TYPE>::
                         BoundedQueue_PushExceptionCompleteProctor(TYPE *queue)
: d_queue_p(queue)
{
}

template <class TYPE>
inline
BoundedQueue_PushExceptionCompleteProctor<TYPE>::
                                   ~BoundedQueue_PushExceptionCompleteProctor()
{
    if (d_queue_p) {
        d_queue_p->pushExceptionComplete();
    }
}

// MANIPULATORS
template <class TYPE>
inline
void BoundedQueue_PushExceptionCompleteProctor<TYPE>::release()
{
    d_queue_p = 0;
}

                         // ------------------------
                         // struct BoundedQueue_Node
                         // ------------------------

// MANIPULATORS
template <class TYPE>
inline
void BoundedQueue_Node<TYPE, true>::setIsUnconstructed(
                                                      bool isUnconstructedFlag)
{
    d_isUnconstructedFlag = isUnconstructedFlag;
}

template <class TYPE>
inline
void BoundedQueue_Node<TYPE, false>::setIsUnconstructed(
                                                 bool /* isUnconstrutedFlag */)
{
}

// ACCESSORS
template <class TYPE>
inline
bool BoundedQueue_Node<TYPE, true>::isUnconstructed() const
{
    return d_isUnconstructedFlag;
}

template <class TYPE>
inline
bool BoundedQueue_Node<TYPE, false>::isUnconstructed() const
{
    return false;
}

                            // ------------------
                            // class BoundedQueue
                            // ------------------

// PRIVATE CLASS METHODS
template <class TYPE>
inline
bool BoundedQueue<TYPE>::circularlyGreater(Uint lhs, Uint rhs)
{
    return lhs > rhs ? (lhs - rhs) <= k_MAXIMUM_CIRCULAR_DIFFERENCE
                     : (rhs - lhs) >  k_MAXIMUM_CIRCULAR_DIFFERENCE;
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isQuiescentState(bsls::Types::Uint64 count)
{
    return (count >> k_FINISHED_SHIFT) == (count & k_STARTED_MASK);
}

template <class TYPE>
inline
bsls::Types::Uint64 BoundedQueue<TYPE>::markFinishedOperation(
                                                           AtomicUint64 *count)
{
    return AtomicOp::addUint64NvAcqRel(count, k_FINISHED_INC);
}

template <class TYPE>
inline
bsls::Types::Uint64 BoundedQueue<TYPE>::markFinishedOperation(
                                                           AtomicUint64 *count,
                                                           int           num)
{
    return AtomicOp::addUint64NvAcqRel(count, num * k_FINISHED_INC);
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::markReclaimed(AtomicUint64 *count)
{
    AtomicOp::addUint64AcqRel(count, k_STARTED_INC + k_FINISHED_INC);
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::markStartedOperation(AtomicUint64 *count)
{
    AtomicOp::addUint64AcqRel(count, k_STARTED_INC);
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::markStartedOperation(AtomicUint64 *count, int num)
{
    AtomicOp::addUint64AcqRel(count, num * k_STARTED_INC);
}

template <class TYPE>
inline
bsls::Types::Uint64 BoundedQueue<TYPE>::unmarkStartedOperation(
                                                           AtomicUint64 *count)
{
    return AtomicOp::addUint64NvAcqRel(count, k_STARTED_DEC);
}

// PRIVATE MANIPULATORS
template <class TYPE>
inline
void BoundedQueue<TYPE>::popComplete(Node *node)
{
    node->d_value.object().~TYPE();

    Uint64 count = markFinishedOperation(&d_popCount);
    if (isQuiescentState(count)) {

        // The total number of popped elements is 'count & k_STARTED_MASK'.
        // Attempt, once, to zero the count and, if successful, post to the
        // push semaphore.

        if (AtomicOp::testAndSwapUint64AcqRel(&d_popCount,
                                              count,
                                              0) == count) {
            d_pushSemaphore.postWithRedundantSignal(
                                      static_cast<int>(count & k_STARTED_MASK),
                                      static_cast<int>(d_capacity),
                                      1);

            Uint emptyCount = AtomicOp::getUintAcquire(&d_emptyWaiterCount);

            if (isEmpty() && updateEmptyCountSeen(emptyCount)) {
                {
                    bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);
                }
                d_emptyCondition.broadcast();
            }
        }
    }
}

template <class TYPE>
void BoundedQueue<TYPE>::popFrontHelper(TYPE *value)
{
    markStartedOperation(&d_popCount);

    // 'd_popIndex' stores the next location to use (want the original value)

    Uint64  index = (AtomicOp::addUint64NvAcqRel(&d_popIndex, 1) - 1)
                                                                  % d_capacity;
    Node   *node  = &d_element_p[index];

    // Nodes marked for reclamation are not counted in 'd_popSemaphore' and are
    // to be skipped; 'd_isUnconstructed' does not need to be modified here
    // since it will be updated in a "push" operation.  However, the node does
    // need to be counted in the 'd_pushSemaphore' as an empty node.  This is
    // accomplished by incrementing the started and finished attributes in
    // 'd_popCount'.

    while (node->isUnconstructed()) {
        markReclaimed(&d_popCount);

        index = (AtomicOp::addUint64NvAcqRel(&d_popIndex, 1) - 1) % d_capacity;
        node  = &d_element_p[index];
    }

    BoundedQueue_PopCompleteGuard<BoundedQueue<TYPE>, Node> guard(this, node);

#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
    *value = bslmf::MovableRefUtil::move(node->d_value.object());
#else
    *value = node->d_value.object();
#endif
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::pushComplete()
{
    Uint64 count = markFinishedOperation(&d_pushCount);
    if (isQuiescentState(count)) {

        // The total number of pushed elements is 'count & k_STARTED_MASK'.
        // Attempt, once, to zero the count and, if successful, post to the pop
        // semaphore.

        if (AtomicOp::testAndSwapUint64AcqRel(&d_pushCount,
                                               count,
                                               0) == count) {
            d_popSemaphore.postWithRedundantSignal(
                                      static_cast<int>(count & k_STARTED_MASK),
                                      static_cast<int>(d_capacity),
                                      1);
        }
    }
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::pushExceptionComplete()
{
    Uint64 count = unmarkStartedOperation(&d_pushCount);

    int numToPost = static_cast<int>(count & k_STARTED_MASK);

    if (0 != numToPost && isQuiescentState(count)) {

        // The total number of pushed elements is 'count & k_STARTED_MASK'.
        // Attempt, once, to zero the count and, if successful, post to the pop
        // semaphore.

        if (AtomicOp::testAndSwapUint64AcqRel(&d_pushCount,
                                               count,
                                               0) == count) {
            d_popSemaphore.post(numToPost);
        }
    }
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::updateEmptyCountSeen(Uint emptyCount)
{
    Uint emptyCountSeen = AtomicOp::getUintAcquire(&d_emptyCountSeen);
    while (circularlyGreater(emptyCount, emptyCountSeen)) {
        const Uint origEmptyCountSeen = emptyCountSeen;

        emptyCountSeen = AtomicOp::testAndSwapUintAcqRel(&d_emptyCountSeen,
                                                         emptyCountSeen,
                                                         emptyCount);

        if (origEmptyCountSeen == emptyCountSeen) {
            return true;                                              // RETURN
        }
    }
    return false;
}

// CREATORS
template <class TYPE>
BoundedQueue<TYPE>::BoundedQueue(bsl::size_t       capacity,
                                 bslma::Allocator *basicAllocator)
: d_pushSemaphore()
, d_popSemaphore()
, d_emptyMutex()
, d_emptyCondition()
, d_element_p(0)
, d_capacity(capacity > 2 ? capacity : 2)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
    AtomicOp::initUint64(&d_pushCount, 0);
    AtomicOp::initUint64(&d_pushIndex, 0);
    AtomicOp::initUint64(&d_popCount,  0);
    AtomicOp::initUint64(&d_popIndex,  0);

    AtomicOp::initUint(&d_emptyWaiterCount, 0);
    AtomicOp::initUint(&d_emptyCountSeen,   0);

    d_element_p = static_cast<Node *>(
                              d_allocator_p->allocate(static_cast<bsl::size_t>(
                                                  d_capacity * sizeof(Node))));

    for (bsl::size_t i = 0; i < d_capacity; ++i) {
        d_element_p[i].setIsUnconstructed(false);
    }

    d_pushSemaphore.post(static_cast<int>(d_capacity));
}

template <class TYPE>
BoundedQueue<TYPE>::~BoundedQueue()
{
    if (d_element_p) {
        removeAll();
        d_allocator_p->deallocate(d_element_p);
    }
}

// MANIPULATORS
template <class TYPE>
inline
int BoundedQueue<TYPE>::popFront(TYPE *value)
{
    int rv = d_popSemaphore.wait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    popFrontHelper(value);

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::pushBack(const TYPE& value)
{
    int rv = d_pushSemaphore.wait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    bslalg::ScalarPrimitives::copyConstruct(node.d_value.address(),
                                            value,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::pushBack(bslmf::MovableRef<TYPE> value)
{
    int rv = d_pushSemaphore.wait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    TYPE& dummy = value;
    bslalg::ScalarPrimitives::moveConstruct(node.d_value.address(),
                                            dummy,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

template <class TYPE>
void BoundedQueue<TYPE>::removeAll()
{
    int reclaim = d_popSemaphore.takeAll();

    if (reclaim) {
        while (reclaim) {
            int count = reclaim;

            reclaim = 0;

            // For quiescent state detection (see *Implementation* *Note*) and
            // eventual 'post' to the 'd_pushSemaphore' to indicate node
            // availability, indicate 'count' remove operations have begin.

            markStartedOperation(&d_popCount, count);

            // 'd_popIndex' stores the next location to use (want the original
            // value)

            Uint64 index = AtomicOp::addUint64NvAcqRel(&d_popIndex, count)
                                                                       - count;

            for (int i = 0; i < count; ++i, ++index) {
                Node& node = d_element_p[index % d_capacity];

                if (!node.isUnconstructed()) {
                    node.d_value.object().~TYPE();
                }
                else {
                    ++reclaim;
                }
            }

            // For quiescent state detection (see *Implementation* *Note*) and
            // eventual 'post' to the 'd_pushSemaphore' to indicate node
            // availability, indicate 'count' remove operations have finished.

            Uint64 popCount = markFinishedOperation(&d_popCount, count);

            if (isQuiescentState(popCount)) {
                // The total number of popped elements is
                // 'popCount & k_STARTED_MASK'.  Attempt, once, to zero the
                // count and, if successful, post to the push semaphore.

                if (AtomicOp::testAndSwapUint64AcqRel(&d_popCount,
                                                      popCount,
                                                      0) == popCount) {
                    d_pushSemaphore.post(static_cast<int>(
                                                   popCount & k_STARTED_MASK));
                }

                Uint emptyCount = AtomicOp::getUintAcquire(
                                                          &d_emptyWaiterCount);

                if (isEmpty() && updateEmptyCountSeen(emptyCount)) {
                    {
                        bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);
                    }
                    d_emptyCondition.broadcast();
                }
            }
        }
    }
}

template <class TYPE>
inline
int BoundedQueue<TYPE>::tryPopFront(TYPE *value)
{
    int rv = d_popSemaphore.tryWait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        if (bslmt::FastPostSemaphore::e_WOULD_BLOCK == rv) {
            return e_EMPTY;                                           // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    popFrontHelper(value);

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::tryPushBack(const TYPE& value)
{
    int rv = d_pushSemaphore.tryWait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        if (bslmt::FastPostSemaphore::e_WOULD_BLOCK == rv) {
            return e_FULL;                                            // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    bslalg::ScalarPrimitives::copyConstruct(node.d_value.address(),
                                            value,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::tryPushBack(bslmf::MovableRef<TYPE> value)
{
    int rv = d_pushSemaphore.tryWait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        if (bslmt::FastPostSemaphore::e_WOULD_BLOCK == rv) {
            return e_FULL;                                            // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    TYPE& dummy = value;
    bslalg::ScalarPrimitives::moveConstruct(node.d_value.address(),
                                            dummy,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

                       // Enqueue/Dequeue State

template <class TYPE>
inline
void BoundedQueue<TYPE>::disablePopFront()
{
    d_popSemaphore.disable();

    {
        bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);
    }
    d_emptyCondition.broadcast();
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::disablePushBack()
{
    d_pushSemaphore.disable();
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::enablePopFront()
{
    d_popSemaphore.enable();
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::enablePushBack()
{
    d_pushSemaphore.enable();
}

// ACCESSORS
template <class TYPE>
inline
bsl::size_t BoundedQueue<TYPE>::capacity() const
{
    return static_cast<bsl::size_t>(d_capacity);
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isEmpty() const
{
    return d_capacity == static_cast<Uint64>(d_pushSemaphore.getValue());
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isFull() const
{
    return 0 == d_pushSemaphore.getValue();
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isPopFrontDisabled() const
{
    return d_popSemaphore.isDisabled();
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isPushBackDisabled() const
{
    return d_pushSemaphore.isDisabled();
}

template <class TYPE>
inline
bsl::size_t BoundedQueue<TYPE>::numElements() const
{
    return d_popSemaphore.getValue();
}

template <class TYPE>
int BoundedQueue<TYPE>::waitUntilEmpty() const
{
    Uint emptyCount = AtomicOp::addUintNvAcqRel(&d_emptyWaiterCount, 1) - 1;

    int state = d_popSemaphore.getDisabledState();
    if (1 == (state & 1)) {
        return e_DISABLED;                                            // RETURN
    }

    if (isEmpty()) {
        return e_SUCCESS;                                             // RETURN
    }

    bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);

    // Return successfully when this queue is empty ('isEmpty()') or this queue
    // was empty at some point since this method was invoked (the condition
    // tested by 'circularlyGreater' in the below).

    bool empty = isEmpty()
              || circularlyGreater(AtomicOp::getUintAcquire(&d_emptyCountSeen),
                                   emptyCount);

    while (!empty && state == d_popSemaphore.getDisabledState()) {
        int rv = d_emptyCondition.wait(&d_emptyMutex);
        if (rv) {
            return e_FAILED;                                          // RETURN
        }
        empty = isEmpty()
             || circularlyGreater(AtomicOp::getUintAcquire(&d_emptyCountSeen),
                                  emptyCount);
    }

    if (!empty) {
        return e_DISABLED;                                            // RETURN
    }

    return e_SUCCESS;
}

                                  // Aspects

template <class TYPE>
inline
bslma::Allocator *BoundedQueue<TYPE>::allocator() const
{
    return d_allocator_p;
}

}  // close package namespace
}  // close enterprise namespace

#endif

// ----------------------------------------------------------------------------
// Copyright 2019 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------
